Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing #20087

Closed
wants to merge 59 commits into from

Conversation

fjh100456
Copy link
Contributor

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing

What changes were proposed in this pull request?

Pass ‘spark.sql.parquet.compression.codec’ value to ‘parquet.compression’.
Pass ‘spark.sql.orc.compression.codec’ value to ‘orc.compress’.

How was this patch tested?

Add test.

Note:
This is the same issue mentioned in #19218 . That branch was deleted mistakenly, so make a new pr instead.

@gatorsmile @maropu @dongjoon-hyun @discipleforteen

…quetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`.
2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none".

## How was this patch tested?
Manual test.
…quetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`.
2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none".

## How was this patch tested?
Manual test.
…'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`.
2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none".

## How was this patch tested?
Manual test.
…'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`.
2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none".
3.Change `compressionCode` to `compressionCodecClassName`.

## How was this patch tested?
Manual test.
@gatorsmile
Copy link
Member

ok to test

@@ -68,6 +68,10 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
.get("mapreduce.output.fileoutputformat.compress.type"))
}

// Set compression by priority
HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
.foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not be affected by hive.exec.compress.output? Could you do an investigation the relation of this setting with the codes from line 57 to line 69?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parquet, without the changes of this pr, the precedence is table-level compression > mapreduce.output.fileoutputformat.compress. spark.sql.parquet.compression never takes effect. But now with this pr, mapreduce.output.fileoutputformat.compress will not take effect. As an alternative, spark.sql.parquet.compression will always take effect if there is no table level compression.

For ORC, hive.exec.compress.output does not take effect, as explained in the comments of the Code.

Shall we keep this precedence for parquet? If so, how to deal with ORC?

@SparkQA
Copy link

SparkQA commented Dec 30, 2017

Test build #85539 has finished for PR 20087 at commit ee0c558.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 2, 2018

Test build #85582 has finished for PR 20087 at commit e9f705d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 2, 2018

Test build #85583 has finished for PR 20087 at commit d3aa7a0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

advancedxy and others added 10 commits January 2, 2018 23:30
## What changes were proposed in this pull request?
stageAttemptId added in TaskContext and corresponding construction modification

## How was this patch tested?
Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxy@gmail.com>

Closes apache#20082 from advancedxy/SPARK-22897.

(cherry picked from commit a6fc300)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?

Assert if code tries to access SQLConf.get on executor.
This can lead to hard to detect bugs, where the executor will read fallbackConf, falling back to default config values, ignoring potentially changed non-default configs.
If a config is to be passed to executor code, it needs to be read on the driver, and passed explicitly.

## How was this patch tested?

Check in existing tests.

Author: Juliusz Sompolski <julek@databricks.com>

Closes apache#20136 from juliuszsompolski/SPARK-22938.

(cherry picked from commit 247a089)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… TABLE SQL statement

## What changes were proposed in this pull request?
Currently, our CREATE TABLE syntax require the EXACT order of clauses. It is pretty hard to remember the exact order. Thus, this PR is to make optional clauses order insensitive for `CREATE TABLE` SQL statement.

```
CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
    [(col_name1 col_type1 [COMMENT col_comment1], ...)]
    USING datasource
    [OPTIONS (key1=val1, key2=val2, ...)]
    [PARTITIONED BY (col_name1, col_name2, ...)]
    [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
    [LOCATION path]
    [COMMENT table_comment]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
    [AS select_statement]
```

The proposal is to make the following clauses order insensitive.
```
    [OPTIONS (key1=val1, key2=val2, ...)]
    [PARTITIONED BY (col_name1, col_name2, ...)]
    [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
    [LOCATION path]
    [COMMENT table_comment]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
```

The same idea is also applicable to Create Hive Table.
```
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
    [(col_name1[:] col_type1 [COMMENT col_comment1], ...)]
    [COMMENT table_comment]
    [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
    [ROW FORMAT row_format]
    [STORED AS file_format]
    [LOCATION path]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
    [AS select_statement]
```

The proposal is to make the following clauses order insensitive.
```
    [COMMENT table_comment]
    [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
    [ROW FORMAT row_format]
    [STORED AS file_format]
    [LOCATION path]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
```

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#20133 from gatorsmile/createDataSourceTableDDL.

(cherry picked from commit 1a87a16)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?

When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.

data source table: delete all partition directories that match the static partition values provided in the insert statement.

hive table: only delete partition directories which have data written into it

This PR adds a new config to make users be able to choose hive's behavior.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#18714 from cloud-fan/overwrite-partition.

(cherry picked from commit a66fe36)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Add a `reset` function to ensure the state in `AnalysisContext ` is per-query.

## How was this patch tested?
The existing test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#20127 from gatorsmile/refactorAnalysisContext.
## What changes were proposed in this pull request?

* String interpolation in ml pipeline example has been corrected as per scala standard.

## How was this patch tested?
* manually tested.

Author: chetkhatri <ckhatrimanjal@gmail.com>

Closes apache#20070 from chetkhatri/mllib-chetan-contrib.

(cherry picked from commit 9a2b65a)
Signed-off-by: Sean Owen <sowen@cloudera.com>
## What changes were proposed in this pull request?

move `ColumnVector` and related classes to `org.apache.spark.sql.vectorized`, and improve the document.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#20116 from cloud-fan/column-vector.

(cherry picked from commit b297029)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?

`FoldablePropagation` is a little tricky as it needs to handle attributes that are miss-derived from children, e.g. outer join outputs. This rule does a kind of stop-able tree transform, to skip to apply this rule when hit a node which may have miss-derived attributes.

Logically we should be able to apply this rule above the unsupported nodes, by just treating the unsupported nodes as leaf nodes. This PR improves this rule to not stop the tree transformation, but reduce the foldable expressions that we want to propagate.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#20139 from cloud-fan/foldable.

(cherry picked from commit 7d045c5)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…rigger, partitionBy

## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes apache#20129 from felixcheung/rwater.

(cherry picked from commit df95a90)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
## What changes were proposed in this pull request?

ChildFirstClassLoader's parent is set to null, so we can't get jars from its parent. This will cause ClassNotFoundException during HiveClient initialization with builtin hive jars, where we may should use spark context loader instead.

## How was this patch tested?

add new ut
cc cloud-fan gatorsmile

Author: Kent Yao <yaooqinn@hotmail.com>

Closes apache#20145 from yaooqinn/SPARK-22950.

(cherry picked from commit 9fa703e)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@fjh100456
Copy link
Contributor Author

@gatorsmile
I'd change the precedence.
For Parquet, if hive.exec.compress.output is true, keep the old precedence, otherwise, get compression from HiveOption.
For Orc, because hive.exec.compress.output always take no effect, we get compression from HiveOption directly.

If this is ok, I'll change the test case. Any suggestion?

@SparkQA
Copy link

SparkQA commented Jan 11, 2018

Test build #85944 has finished for PR 20087 at commit 4b89b44.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -61,7 +61,7 @@ class OrcOptions(

object OrcOptions {
// The ORC compression short names
private val shortOrcCompressionCodecNames = Map(
val shortOrcCompressionCodecNames = Map(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of changing the access modifiers, add a public function

def getORCCompressionCodecName(name: String): String = shortOrcCompressionCodecNames(name)

@@ -76,7 +76,7 @@ object ParquetOptions {
val MERGE_SCHEMA = "mergeSchema"

// The parquet compression short names
private val shortParquetCompressionCodecNames = Map(
val shortParquetCompressionCodecNames = Map(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

}
}

private val maxRecordNum = 500
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduce it to 50 for decreasing the execution time

block <- footer.getParquetMetadata.getBlocks.asScala
column <- block.getColumns.asScala
} yield column.getCodec.name()
case "orc" => new File(path).listFiles().filter{ file =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a space before {

tableName: String,
partition: Option[String]): Unit = {
val partitionInsert = partition.map(p => s"partition (p='$p')").mkString
sql(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is INSERT after CREATE TABLE. We also need to test/fix another common cases, CTAS [CREATE TABLE AS SELECT]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CTAS statement is not allowed to create a partitioned table using Hive's file formats. So I use the syntax of CREATE TABLE tableName USING ... OPTIONS (...) PARTITIONED BY... to create a table.

However, it seems to be different from non-partitioned hive table when convertMetastore is true.For non-partitioned hive table, session-level will take effect, but for table created by CTAS, table-level takes effect.

And if I merge the code of your PR(#20120), they would be consistent, table-level compression will take effect.

Should I fix it after your PR closed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this PR first. Will ping you when my PR is fixed. Thanks!

val compression = Option(tableCompression)
checkCompressionCodecForTable(format, isPartitioned, compression) {
case (realCompressionCodec, tableSize) => assertionCompressionCodec(compression,
sessionCompressionCodec, realCompressionCodec, tableSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case (realCompressionCodec, tableSize) =>
  assertionCompressionCodec(
    compression, sessionCompressionCodec, realCompressionCodec, tableSize)

isPartitioned,
convertMetastore,
compressionCodecs = compressCodecs,
tableCompressionCodecs = compressCodecs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add another scenario.

compressionCodecs = Nil,
tableCompressionCodecs = compressCodecs 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean only with table-level compression? Actually, compressionCodecs is the session-level compressionCodec, even if set to Nil or null here, it still takes the default value snappy.
In the first test case test("both table-level and session-level compression are set"), isn't it already contained?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

isPartitioned,
convertMetastore,
compressionCodecs = compressCodecs,
tableCompressionCodecs = List(null)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List(null) -> Nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If change to Nil, the follow function may requires special handling of this situation. Set to null is to get a None. Can we keep it?

private def checkTableCompressionCodecForCodecs(
      format: String,
      isPartitioned: Boolean,
      convertMetastore: Boolean,
      compressionCodecs: List[String],
      tableCompressionCodecs: List[String])
      (assertionCompressionCodec: (Option[String], String, String, Long) => Unit): Unit = {
    withSQLConf(getConvertMetastoreConfName(format) -> convertMetastore.toString) {
      tableCompressionCodecs.foreach { tableCompression =>
        compressionCodecs.foreach { sessionCompressionCodec =>
          withSQLConf(getSparkCompressionConfName(format) -> sessionCompressionCodec) {
            // 'tableCompression = null' means no table-level compression
            val compression = Option(tableCompression)
            checkCompressionCodecForTable(format, isPartitioned, compression) {
              case (realCompressionCodec, tableSize) =>
                assertionCompressionCodec(
                  compression, sessionCompressionCodec, realCompressionCodec, tableSize)
            }
          }
        }
      }
    }
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If set to Nil, foreach statement will not go in and the testcase will do nothing. So a special handling must be done. Set to null will get a None value by val compression = Option(tableCompression) and it is exactly what I want.

val relCompressionCodecs =
if (isPartitioned) compressCodecs.flatMap { codec =>
getTableCompressionCodec(s"$tablePath/p=$codec", format)
} else getTableCompressionCodec(tablePath, format)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
  getTableCompressionCodec(tablePath, format)
}

createTable(tmpDir, tableName, isPartitioned, format, None)
withTable(tableName) {
compressCodecs.foreach { compressionCodec =>
val partition = if (isPartitioned) Some(compressionCodec) else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition -> partitionValue

@fjh100456
Copy link
Contributor Author

@gatorsmile
I had fix the test case for CTAS, but it may not pass the test, until merge the code of your PR #20120

@SparkQA
Copy link

SparkQA commented Jan 19, 2018

Test build #86383 has finished for PR 20087 at commit 99271d6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

compressionCodecs = compressCodecs,
tableCompressionCodecs = compressCodecs) {
case
(tableCompressionCodec, sessionCompressionCodec, realCompressionCodec, tableSize) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case (tableCodec, sessionCodec, realCodec, tableSize) =>

def checkForTableWithCompressProp(format: String, compressCodecs: List[String]): Unit = {
Seq(true, false).foreach { isPartitioned =>
Seq(true, false).foreach { convertMetastore =>
Seq(true, false).foreach { usingCTAS =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us disable this. We can merge this PR first

// TODO: Also verify CTAS cases when the bug is fixed.
Seq(false).foreach { usingCTAS =>

@@ -82,4 +82,7 @@ object ParquetOptions {
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
"lzo" -> CompressionCodecName.LZO)

def getParquetCompressionCodecName(name: String): String =
shortParquetCompressionCodecNames(name).name()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def getParquetCompressionCodecName(name: String): String = {
  shortParquetCompressionCodecNames(name).name()
}

tableName: String,
partition: Option[String]): Unit = {
val partitionInsert = partition.map(p => s"partition (p='$p')").mkString
sql(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this PR first. Will ping you when my PR is fixed. Thanks!

// Always expect session-level take effect
assert(sessionCompressionCodec == realCompressionCodec)
assert(checkTableSize(format, sessionCompressionCodec,
isPartitioned, convertMetastore, usingCTAS, tableSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(checkTableSize(
  format, sessionCompressionCodec, isPartitioned, convertMetastore, usingCTAS, tableSize))

compressionCodecs = compressCodecs,
tableCompressionCodecs = List(null)) {
case
(tableCompressionCodec, sessionCompressionCodec, realCompressionCodec, tableSize) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

@gatorsmile
Copy link
Member

@fjh100456 Thanks for working on it! It is pretty close to be merged.

compressionCodecs = compressCodecs,
tableCompressionCodecs = List(null)) {
case
(tableCodec, sessionCodec, realCodec, tableSize) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the style issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I made a mistake. Thank you !

@SparkQA
Copy link

SparkQA commented Jan 20, 2018

Test build #86409 has finished for PR 20087 at commit 5b5e1df.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 20, 2018

Test build #86411 has finished for PR 20087 at commit 118f788.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 20, 2018

Test build #86415 has finished for PR 20087 at commit 118f788.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

Thanks! Merged to master/2.3

asfgit pushed a commit that referenced this pull request Jan 20, 2018
…rk.sql.orc.compression.codec' configuration doesn't take effect on hive table writing

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing

What changes were proposed in this pull request?

Pass ‘spark.sql.parquet.compression.codec’ value to ‘parquet.compression’.
Pass ‘spark.sql.orc.compression.codec’ value to ‘orc.compress’.

How was this patch tested?

Add test.

Note:
This is the same issue mentioned in #19218 . That branch was deleted mistakenly, so make a new pr instead.

gatorsmile maropu dongjoon-hyun discipleforteen

Author: fjh100456 <fu.jinhua6@zte.com.cn>
Author: Takeshi Yamamuro <yamamuro@apache.org>
Author: Wenchen Fan <wenchen@databricks.com>
Author: gatorsmile <gatorsmile@gmail.com>
Author: Yinan Li <liyinan926@gmail.com>
Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Juliusz Sompolski <julek@databricks.com>
Author: Felix Cheung <felixcheung_m@hotmail.com>
Author: jerryshao <sshao@hortonworks.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Gera Shegalov <gera@apache.org>
Author: chetkhatri <ckhatrimanjal@gmail.com>
Author: Joseph K. Bradley <joseph@databricks.com>
Author: Bago Amirbekian <bago@databricks.com>
Author: Xianjin YE <advancedxy@gmail.com>
Author: Bruce Robbins <bersprockets@gmail.com>
Author: zuotingbing <zuo.tingbing9@zte.com.cn>
Author: Kent Yao <yaooqinn@hotmail.com>
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Adrian Ionescu <adrian@databricks.com>

Closes #20087 from fjh100456/HiveTableWriting.

(cherry picked from commit 00d1691)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@asfgit asfgit closed this in 00d1691 Jan 20, 2018
// For ORC,"mapreduce.output.fileoutputformat.compress",
// "mapreduce.output.fileoutputformat.compress.codec", and
// "mapreduce.output.fileoutputformat.compress.type"
// have no impact because it uses table properties to store compression information.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is the existing behavior, but could you investigate how Hive behaves when Parquet.Compress is set. https://issues.apache.org/jira/browse/HIVE-7858 Is it the same as ORC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely, I'll do it this days.

Copy link
Contributor Author

@fjh100456 fjh100456 Jan 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parquet, using a hive client, parquet.compression has a higher priority than mapreduce.output.fileoutputformat.compress. And table-level compression( set by tblproperties) has the highest priority. parquet.compression set by cli also has a higher priority than mapreduce.output.fileoutputformat.compress.

After this pr, the priority does not changed. If table-level compression was set, other compression would not take effect, even though mapreduce.output.... were set, which is the same with hive. But parquet.compression set by spark cli does not take effect, unless set hive.exec.compress.output to true. This may because we do not get parquet.compression from the session, and I wonder if it's necessary because we have spark.sql.parquet.comression.codec instead.

For orc, hive.exec.compress.output and mapreduce.output.... have no impact really, but table-leval compression (set by tblproperties) always take effect. orc.compression set by spark cli does not take effect too, even though set hive.exec.compress.output to true, which is differet with parquet.
Another question, the comment say it uses table properties to store compression information, actully, by manul test, I found orc-tables also can have mixed compressions, and the data can be read together correctly, maybe I'm not very clear with what the comment mean.

My Hive version for this test is 1.1.0. Actully it's a little difficut for me to get a higher version runable Hive client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment might not be correct now. We need to follow what the latest Hive works, if possible. The best way to try Hive (and the other RDBMS) is using docker. Maybe you can try the docker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll try it.

asfgit pushed a commit that referenced this pull request Sep 7, 2018
## What changes were proposed in this pull request?
Before Apache Spark 2.3, table properties were ignored when writing data to a hive table(created with STORED AS PARQUET/ORC syntax), because the compression configurations were not passed to the FileFormatWriter in hadoopConf. Then it was fixed in #20087. But actually for CTAS with USING PARQUET/ORC syntax, table properties were ignored too when convertMastore, so the test case for CTAS not supported.

Now it has been fixed  in #20522 , the test case should be enabled too.

## How was this patch tested?
This only re-enables the test cases of previous PR.

Closes #22302 from fjh100456/compressionCodec.

Authored-by: fjh100456 <fu.jinhua6@zte.com.cn>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 473f2fb)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.